package com.ikingtech.framework.sdk.cluster.client;

import com.ikingtech.framework.sdk.cluster.node.ClusterNode;
import com.ikingtech.framework.sdk.cluster.node.ClusterNodeManager;
import com.ikingtech.framework.sdk.cluster.node.handler.ClusterNodeHeartBeatMessage;
import com.ikingtech.framework.sdk.cluster.node.handler.ClusterNodeJoinMessage;
import com.ikingtech.framework.sdk.cluster.node.handler.ClusterNodeMessage;
import com.ikingtech.framework.sdk.utils.Tools;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
 * @author tie yan
 */
@Slf4j
@RequiredArgsConstructor
@Component
public class ClusterClient {

    private final List<Channel> clusterNodeConnections = new ArrayList<>();

    private final Bootstrap bootstrap = new Bootstrap();

    private final EventLoopGroup group = new NioEventLoopGroup();

    private final ClusterNodeManager nodeManager;

    public void init() {
        try {
            this.bootstrap.group(this.group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new ClusterClientHandler());
                        }
                    });

            List<ClusterNode> clusterNodes = this.nodeManager.getAll();
            if (Tools.Coll.isBlank(clusterNodes)) {
                return;
            }
            for (ClusterNode clusterNode : clusterNodes) {
                this.clusterNodeConnections.add(this.bootstrap.connect(clusterNode.getAddress(), clusterNode.getPort()).sync().channel());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void destroy() {
        this.group.shutdownGracefully();
    }

    public void newConnection(ClusterNode node) {
        ChannelFuture channelFuture;
        try {
            channelFuture = this.bootstrap.connect(node.getAddress(), node.getPort()).sync();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        if (!channelFuture.isSuccess()) {
            try {
                channelFuture.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                log.error("[CLUSTER]create connection to node fail[{}].", e.getMessage());
            }
        }
        this.clusterNodeConnections.add(channelFuture.channel());
    }

    public void publishJoinMessage(ClusterNode node) {
        this.broadcast(new ClusterNodeJoinMessage(node));
    }

    public void publishHeartBeatMessage() {
        this.broadcast(new ClusterNodeHeartBeatMessage());
    }

    public void broadcast(ClusterNodeMessage message) {
        this.clusterNodeConnections.forEach(serverConnection -> {
            ChannelFuture channelFuture = serverConnection.writeAndFlush(Tools.Json.toJsonStr(message));
            if (!channelFuture.isSuccess()) {
                try {
                    channelFuture.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException e) {
                    log.error("[CLUSTER]netty client of node(payload={}) broadcast fail[{}].", message, e.getMessage());
                }
            }
        });
    }
}
